/*
 * Copyright 2014 LinkedIn, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import azkaban.jobExecutor.ProcessJob;
import azkaban.utils.Props;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

public class JavaJobRunnerMain {

  public static final String JOB_CLASS = "job.class";
  public static final String DEFAULT_RUN_METHOD = "run";
  public static final String DEFAULT_CANCEL_METHOD = "cancel";

  // This is the Job interface method to get the properties generated by the
  // job.
  public static final String GET_GENERATED_PROPERTIES_METHOD =
      "getJobGeneratedProperties";

  public static final String CANCEL_METHOD_PARAM = "method.cancel";
  public static final String RUN_METHOD_PARAM = "method.run";
  public static final String[] PROPS_CLASSES = new String[]{
      "azkaban.utils.Props", "azkaban.common.utils.Props"};

  private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");

  public final Logger _logger;

  public String _cancelMethod;
  public String _jobName;
  public Object _javaObject;
  private boolean _isFinished = false;

  public JavaJobRunnerMain() throws Exception {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        cancelJob();
      }
    });

    try {
      this._jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
      final String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);

      this._logger = Logger.getRootLogger();
      this._logger.removeAllAppenders();
      final ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
      appender.activateOptions();
      this._logger.addAppender(appender);

      final Properties prop = new Properties();
      prop.load(Files.newBufferedReader(Paths.get(propsFile), StandardCharsets.UTF_8));

      this._logger.info("Running job " + this._jobName);
      final String className = prop.getProperty(JOB_CLASS);
      if (className == null) {
        throw new Exception("Class name is not set.");
      }
      this._logger.info("Class name " + className);

      // Create the object using proxy

      this._javaObject = getObject(this._jobName, className, prop, this._logger);

      if (this._javaObject == null) {
        this._logger.info("Could not create java object to run job: " + className);
        throw new Exception("Could not create running object");
      }

      this._cancelMethod =
          prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);

      final String runMethod =
          prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
      this._logger.info("Invoking method " + runMethod);

      this._logger.info("Proxy check failed, not proxying run.");
      runMethod(this._javaObject, runMethod);

      this._isFinished = true;

      // Get the generated properties and store them to disk, to be read
      // by ProcessJob.
      try {
        final Method generatedPropertiesMethod =
            this._javaObject.getClass().getMethod(GET_GENERATED_PROPERTIES_METHOD,
                new Class<?>[]{});
        final Object outputGendProps =
            generatedPropertiesMethod.invoke(this._javaObject, new Object[]{});
        if (outputGendProps != null) {
          final Method toPropertiesMethod =
              outputGendProps.getClass().getMethod("toProperties",
                  new Class<?>[]{});
          final Properties properties =
              (Properties) toPropertiesMethod.invoke(outputGendProps,
                  new Object[]{});

          final Props outputProps = new Props(null, properties);
          outputGeneratedProperties(outputProps);
        } else {
          outputGeneratedProperties(new Props());
        }

      } catch (final NoSuchMethodException e) {
        this._logger
            .info(String
                .format(
                    "Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
                    GET_GENERATED_PROPERTIES_METHOD, this._javaObject));
        outputGeneratedProperties(new Props());
      }
    } catch (final Exception e) {
      this._isFinished = true;
      throw e;
    }
  }

  public static void main(final String[] args) throws Exception {
    final JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
  }

  private static Object getObject(final String jobName, final String className,
      final Properties properties, final Logger logger) throws Exception {

    final Class<?> runningClass =
        JavaJobRunnerMain.class.getClassLoader().loadClass(className);

    if (runningClass == null) {
      throw new Exception("Class " + className
          + " was not found. Cannot run job.");
    }

    Class<?> propsClass = null;
    for (final String propClassName : PROPS_CLASSES) {
      try {
        propsClass =
            JavaJobRunnerMain.class.getClassLoader().loadClass(propClassName);
      } catch (final ClassNotFoundException e) {
      }

      if (propsClass != null
          && getConstructor(runningClass, String.class, propsClass) != null) {
        // is this the props class
        break;
      }
      propsClass = null;
    }

    Object obj = null;
    if (propsClass != null
        && getConstructor(runningClass, String.class, propsClass) != null) {
      // Create props class
      final Constructor<?> propsCon =
          getConstructor(propsClass, propsClass, Properties[].class);
      final Object props =
          propsCon.newInstance(null, new Properties[]{properties});

      final Constructor<?> con =
          getConstructor(runningClass, String.class, propsClass);
      logger.info("Constructor found " + con.toGenericString());
      obj = con.newInstance(jobName, props);
    } else if (getConstructor(runningClass, String.class, Properties.class) != null) {

      final Constructor<?> con =
          getConstructor(runningClass, String.class, Properties.class);
      logger.info("Constructor found " + con.toGenericString());
      obj = con.newInstance(jobName, properties);
    } else if (getConstructor(runningClass, String.class, Map.class) != null) {
      final Constructor<?> con =
          getConstructor(runningClass, String.class, Map.class);
      logger.info("Constructor found " + con.toGenericString());

      final HashMap<Object, Object> map = new HashMap<>();
      for (final Map.Entry<Object, Object> entry : properties.entrySet()) {
        map.put(entry.getKey(), entry.getValue());
      }
      obj = con.newInstance(jobName, map);
    } else if (getConstructor(runningClass, String.class) != null) {
      final Constructor<?> con = getConstructor(runningClass, String.class);
      logger.info("Constructor found " + con.toGenericString());
      obj = con.newInstance(jobName);
    } else if (getConstructor(runningClass) != null) {
      final Constructor<?> con = getConstructor(runningClass);
      logger.info("Constructor found " + con.toGenericString());
      obj = con.newInstance();
    } else {
      logger.error("Constructor not found. Listing available Constructors.");
      for (final Constructor<?> c : runningClass.getConstructors()) {
        logger.info(c.toGenericString());
      }
    }
    return obj;
  }

  private static Constructor<?> getConstructor(final Class<?> c, final Class<?>... args) {
    try {
      final Constructor<?> cons = c.getConstructor(args);
      return cons;
    } catch (final NoSuchMethodException e) {
      return null;
    }
  }

  private void runMethod(final Object obj, final String runMethod)
      throws IllegalAccessException, InvocationTargetException,
      NoSuchMethodException {
    obj.getClass().getMethod(runMethod, new Class<?>[]{}).invoke(obj);
  }

  private void outputGeneratedProperties(final Props outputProperties) {

    if (outputProperties == null) {
      this._logger.info("  no gend props");
      return;
    }
    for (final String key : outputProperties.getKeySet()) {
      this._logger
          .info("  gend prop " + key + " value:" + outputProperties.get(key));
    }

    final String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
    if (outputFileStr == null) {
      return;
    }

    this._logger.info("Outputting generated properties to " + outputFileStr);

    final Map<String, String> properties = new LinkedHashMap<>();
    for (final String key : outputProperties.getKeySet()) {
      properties.put(key, outputProperties.get(key));
    }

    OutputStream writer = null;
    try {
      writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));

      // Manually serialize into JSON instead of adding org.json to
      // external classpath. Reduces one dependency for something that's
      // essentially easy.
      writer.write("{\n".getBytes(StandardCharsets.UTF_8));
      for (final Map.Entry<String, String> entry : properties.entrySet()) {
        writer.write(String.format("  \"%s\":\"%s\",\n",
            entry.getKey().replace("\"", "\\\\\""),
            entry.getValue().replace("\"", "\\\\\"")).getBytes(StandardCharsets.UTF_8));
      }
      writer.write("}".getBytes(StandardCharsets.UTF_8));
    } catch (final Exception e) {
      throw new RuntimeException("Unable to store output properties to: "
          + outputFileStr);
    } finally {
      try {
        if (writer != null) {
          writer.close();
        }
      } catch (final IOException e) {
      }
    }
  }

  public void cancelJob() {
    if (this._isFinished) {
      return;
    }
    this._logger.info("Attempting to call cancel on this job");
    if (this._javaObject != null) {
      Method method = null;

      try {
        method = this._javaObject.getClass().getMethod(this._cancelMethod);
      } catch (final SecurityException e) {
      } catch (final NoSuchMethodException e) {
      }

      if (method != null) {
        try {
          method.invoke(this._javaObject);
        } catch (final Exception e) {
          if (this._logger != null) {
            this._logger.error("Cancel method failed! ", e);
          }
        }
      } else {
        throw new RuntimeException("Job " + this._jobName
            + " does not have cancel method " + this._cancelMethod);
      }
    }
  }

}
